{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## <font color='green'> Regression & Classification<font>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 1. Description<font>\n",
    "Dataset:\n",
    " 1. This dataset explores different factors affecting the people's\n",
    " rating for a particular food item.\n",
    " 2. Dataset contains over 20k recipes listed by recipe rating,\n",
    " nutritional information and assigned category (sparse).\n",
    "\n",
    "Objective (Regression):\n",
    " 1. The objective is to find the rating of the food recipe based\n",
    " on its nutritional information and assigned category.\n",
    "\n",
    "Objective (Classification):\n",
    " 1. The objective is to classify the food recipe as highly rated or not of the food recipe based\n",
    " on its nutritional information and assigned category.\n",
    " 2. Food recipe with ratings greater than 3.8 is classified as highly rated food (1) \n",
    "    and food recipe with rating less than equal to 3.8 is classified as average/poorly \n",
    "    rated food (0)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 2. Data Preprocessing <font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Importing the standard libraries\n",
    "import time\n",
    "import os\n",
    "import warnings\n",
    "import scipy\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "from collections import OrderedDict\n",
    "from pandas.core.common import SettingWithCopyWarning\n",
    "\n",
    "# Importing modules from Sklearn library\n",
    "from sklearn.preprocessing import MinMaxScaler\n",
    "from sklearn.model_selection import train_test_split\n",
    "\n",
    "# Importing modules from Pyspark library\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "from pyspark.ml.evaluation import MulticlassClassificationEvaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pyspark Session Created\n"
     ]
    }
   ],
   "source": [
    "# Creating the Spark Session\n",
    "spark = SparkSession.builder.appName(\"food_recipe\").getOrCreate()\n",
    "print(\"Pyspark Session Created\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def preprocess_data_regression(filename):\n",
    "    # Reading the data\n",
    "    data = pd.read_csv(filename)\n",
    "    print(\"Shape of the data: \", data.shape)\n",
    "\n",
    "\n",
    "    # Renaming the columns\n",
    "    data = data.rename({'rating': 'label',\n",
    "                        'st. louis': 'st_luis',\n",
    "                        \"st. patrick's day\": 'st_patrick_day',\n",
    "                        \"washington, d.c.\": \"washington_dc\"}, axis='columns')\n",
    "\n",
    "    # Supressing the SettingWithCopyWarning\n",
    "    warnings.simplefilter(action=\"ignore\", category=SettingWithCopyWarning)\n",
    "\n",
    "    # Typecasting the columns\n",
    "    data.label = data.label.astype('float64')\n",
    "    data.calories = data.calories.astype('float64')\n",
    "\n",
    "    # Removing the title column\n",
    "    data = data.drop(['title'], axis=1)\n",
    "\n",
    "    # Replacing the null values of columns with their respective medians\n",
    "    data['label'].fillna(data['label'].median(), inplace=True)\n",
    "    data['calories'].fillna(data['calories'].median(), inplace=True)\n",
    "    data['protein'].fillna(data['protein'].median(), inplace=True)\n",
    "    data['fat'].fillna(data['fat'].median(), inplace=True)\n",
    "    data['sodium'].fillna(data['sodium'].median(), inplace=True)\n",
    "\n",
    "    # Splitting the data into train_data (80%) and test_data (20%)\n",
    "    train_data, test_data = train_test_split(data, test_size=0.20, random_state=42)\n",
    "    \n",
    "    # Scaling the train_data and test_data\n",
    "    # Initializing the scalar\n",
    "    scaler = MinMaxScaler()\n",
    "\n",
    "    # Scaling all the columns of the train data\n",
    "    scaling_columns = train_data.columns[1:]\n",
    "\n",
    "    # Scaling the training data\n",
    "    scaler.fit(train_data[scaling_columns])\n",
    "    train_data[scaling_columns] = scaler.transform(train_data[scaling_columns])\n",
    "\n",
    "    # Scaling the testing data\n",
    "    test_data[scaling_columns] = scaler.transform(test_data[scaling_columns])\n",
    "\n",
    "    # Saving the test_data and train_data as CSVs\n",
    "    train_data.to_csv(\"epr_train_reg.csv\", index=None)\n",
    "    test_data.to_csv(\"epr_test_reg.csv\", index=None)\n",
    "\n",
    "    print(\"Shape of the training data: \", train_data.shape)\n",
    "    print(\"Shape of the testing data: \", test_data.shape)\n",
    "\n",
    "    print(\"Training data and testing data created successfully. \\n\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def preprocess_data_classification(filename):\n",
    "    # Reading the data\n",
    "    data = pd.read_csv(filename)\n",
    "    print(\"Shape of the data: \", data.shape)\n",
    "\n",
    "    # Renaming the columns\n",
    "    data = data.rename({'rating': 'label',\n",
    "                        'st. louis': 'st_luis',\n",
    "                        \"st. patrick's day\": 'st_patrick_day',\n",
    "                        \"washington, d.c.\": \"washington_dc\"}, axis='columns')\n",
    "\n",
    "    # Supressing the SettingWithCopyWarning\n",
    "    warnings.simplefilter(action=\"ignore\", category=SettingWithCopyWarning)\n",
    "\n",
    "    # Typecasting the columns\n",
    "    data.label = data.label.astype('float64')\n",
    "    data.calories = data.calories.astype('float64')\n",
    "\n",
    "    # Removing the title column\n",
    "    data = data.drop(['title'], axis=1)\n",
    "\n",
    "    # Replacing the null values of columns with their respective medians\n",
    "    data['label'].fillna(data['label'].median(), inplace=True)\n",
    "    data['calories'].fillna(data['calories'].median(), inplace=True)\n",
    "    data['protein'].fillna(data['protein'].median(), inplace=True)\n",
    "    data['fat'].fillna(data['fat'].median(), inplace=True)\n",
    "    data['sodium'].fillna(data['sodium'].median(), inplace=True)\n",
    "    \n",
    "    # Converting the label <= 4 to 0 and label > 4 to 1\n",
    "    data.loc[data['label'] <= 4, 'new_label'] = 0 \n",
    "    data.loc[data['label'] > 4, 'new_label'] = 1\n",
    "\n",
    "    # Removing the old label column\n",
    "    data = data.drop(['label'], axis=1)\n",
    "\n",
    "    # Splitting the data into train_data (80%) and test_data (20%)\n",
    "    train_data, test_data = train_test_split(data, test_size=0.20, random_state=42)\n",
    "    \n",
    "    # Scaling the train_data and test_data\n",
    "    # Initializing the scalar\n",
    "    scaler = MinMaxScaler()\n",
    "\n",
    "    # Scaling all the columns of the train data\n",
    "    scaling_columns = train_data.columns[1:]\n",
    "\n",
    "    # Scaling the training data\n",
    "    scaler.fit(train_data[scaling_columns])\n",
    "    train_data[scaling_columns] = scaler.transform(train_data[scaling_columns])\n",
    "\n",
    "    # Scaling the testing data\n",
    "    test_data[scaling_columns] = scaler.transform(test_data[scaling_columns])\n",
    "\n",
    "    # Saving the test_data and train_data as CSVs\n",
    "    train_data.to_csv(\"epr_train_cl.csv\", index=None)\n",
    "    test_data.to_csv(\"epr_test_cl.csv\", index=None)\n",
    "\n",
    "    print(\"Shape of the training data: \", train_data.shape)\n",
    "    print(\"Shape of the testing data: \", test_data.shape)\n",
    "\n",
    "    print(\"Training data and testing data created successfully. \\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### <font color='green'> 3. Algorithm Evaluation <font>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_time = []\n",
    "test_time = []\n",
    "train_score = []\n",
    "test_score = []\n",
    "estimator_name = []"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def evaluate_regression(estimator, estimator_nm, train_data, test_data):\n",
    "    estimator_name.append(estimator_nm)\n",
    "    \n",
    "    if \"pyspark\" in estimator_nm:\n",
    "        # Creating the object for feature vector\n",
    "        featureAssembler = VectorAssembler(inputCols=train_data.columns[1:], outputCol=\"Features\")\n",
    "\n",
    "        # Freature Vector of train Data\n",
    "        temp_output = featureAssembler.transform(train_data)\n",
    "        FeatureVec_train_data = temp_output.select(\"Features\", \"label\")\n",
    "\n",
    "        # Freature Vector of test Data\n",
    "        temp_output1 = featureAssembler.transform(test_data)\n",
    "        FeatureVec_test_data = temp_output1.select(\"Features\", \"label\")\n",
    "\n",
    "        start_time = time.time()\n",
    "        pyspark_FM_reg_model = estimator.fit(FeatureVec_train_data)\n",
    "        train_time.append(round(time.time() - start_time, 4))\n",
    "        start_time = time.time()\n",
    "        # Testing the model on train_data\n",
    "        predictions_train = pyspark_FM_reg_model.transform(FeatureVec_train_data)\n",
    "        # Creating the object of evaluator\n",
    "        evaluator_r2_train = RegressionEvaluator(labelCol=\"label\", predictionCol=\"prediction\", metricName=\"r2\")\n",
    "        train_score.append(evaluator_r2_train.evaluate(predictions_train))\n",
    "        \n",
    "        # Testing the model on test_data\n",
    "        predictions_test = pyspark_FM_reg_model.transform(FeatureVec_test_data)\n",
    "        # Creating the object of evaluator\n",
    "        evaluator_r2 = RegressionEvaluator(labelCol=\"label\", predictionCol=\"prediction\", metricName=\"r2\")\n",
    "        test_score.append(evaluator_r2.evaluate(predictions_test))\n",
    "        \n",
    "        test_time.append(round(time.time() - start_time, 4))\n",
    "\n",
    "    elif \"frovedis\" in estimator_nm:\n",
    "        # Features\n",
    "        train_features = train_data.drop(columns=[\"label\"])\n",
    "        test_features = test_data.drop(columns=[\"label\"])\n",
    "        # Ratings (target variable)\n",
    "        train_rating = train_data[\"label\"]\n",
    "        test_rating = test_data[\"label\"]\n",
    "        X_train_sparse_matrix = scipy.sparse.csr_matrix(train_features.values)\n",
    "        X_test_sparse_matrix = scipy.sparse.csr_matrix(test_features.values)\n",
    "        \n",
    "        start_time = time.time()\n",
    "        estimator.fit(X_train_sparse_matrix, train_rating)\n",
    "        train_time.append(round(time.time() - start_time, 4))\n",
    "\n",
    "        start_time = time.time()\n",
    "        train_score.append(estimator.score(X_train_sparse_matrix, train_rating))\n",
    "        test_score.append(estimator.score(X_test_sparse_matrix, test_rating))\n",
    "        test_time.append(round(time.time() - start_time, 4))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "def evaluate_classifier(estimator, estimator_nm, train_data, test_data):\n",
    "    estimator_name.append(estimator_nm)\n",
    "    \n",
    "    if \"pyspark\" in estimator_nm:\n",
    "        # Creating the object for feature vector\n",
    "        featureAssembler = VectorAssembler(inputCols=train_data.columns[:-1], outputCol=\"Features\")\n",
    "\n",
    "        # Freature Vector of train Data\n",
    "        temp_output = featureAssembler.transform(train_data)\n",
    "        FeatureVec_train_data = temp_output.select(\"Features\", \"new_label\")\n",
    "\n",
    "        # Freature Vector of test Data\n",
    "        temp_output1 = featureAssembler.transform(test_data)\n",
    "        FeatureVec_test_data = temp_output1.select(\"Features\", \"new_label\")\n",
    "\n",
    "        start_time = time.time()\n",
    "        pyspark_FM_cl_model = estimator.fit(FeatureVec_train_data)\n",
    "        train_time.append(round(time.time() - start_time, 4))\n",
    "        \n",
    "        start_time = time.time()\n",
    "        # Testing the model on train_data\n",
    "        predictions_train = pyspark_FM_cl_model.transform(FeatureVec_train_data)\n",
    "        # Creating the object of evaluator\n",
    "        evaluator_classification = MulticlassClassificationEvaluator(\n",
    "            labelCol=\"new_label\", predictionCol=\"prediction\", metricName=\"accuracy\")\n",
    "        train_score.append(evaluator_classification.evaluate(predictions_train))\n",
    "        \n",
    "        # Testing the model on test_data\n",
    "        predictions_test = pyspark_FM_cl_model.transform(FeatureVec_test_data)\n",
    "        # Creating the object of evaluator\n",
    "        evaluator_classification_test = MulticlassClassificationEvaluator(\n",
    "            labelCol=\"new_label\", predictionCol=\"prediction\", metricName=\"accuracy\")\n",
    "        test_score.append(evaluator_classification_test.evaluate(predictions_test))\n",
    "        \n",
    "        test_time.append(round(time.time() - start_time, 4))\n",
    "\n",
    "    elif \"frovedis\" in estimator_nm:\n",
    "        # Features\n",
    "        train_features = train_data.drop(columns=[\"new_label\"])\n",
    "        test_features = test_data.drop(columns=[\"new_label\"])\n",
    "        # Ratings (target variable)\n",
    "        train_rating = train_data[\"new_label\"]\n",
    "        test_rating = test_data[\"new_label\"]\n",
    "        X_train_sparse_matrix = scipy.sparse.csr_matrix(train_features.values)\n",
    "        X_test_sparse_matrix = scipy.sparse.csr_matrix(test_features.values)\n",
    "        \n",
    "        start_time = time.time()\n",
    "        estimator.fit(X_train_sparse_matrix, train_rating)\n",
    "        train_time.append(round(time.time() - start_time, 4))\n",
    "\n",
    "        start_time = time.time()\n",
    "        train_score.append(estimator.score(X_train_sparse_matrix, train_rating))\n",
    "        test_score.append(estimator.score(X_test_sparse_matrix, test_rating))\n",
    "        test_time.append(round(time.time() - start_time, 4))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Shape of the data:  (20052, 680)\n",
      "Shape of the training data:  (16041, 679)\n",
      "Shape of the testing data:  (4011, 679)\n",
      "Training data and testing data created successfully. \n",
      "\n"
     ]
    }
   ],
   "source": [
    "#---- Data Preparation for Regression----\n",
    "\n",
    "preprocess_data_regression('datasets/epi_r.csv')\n",
    "# Loading the dataset for Pyspark\n",
    "train_data_nm_reg = 'epr_train_reg.csv'\n",
    "test_data_nm_reg = 'epr_test_reg.csv'\n",
    "train_data_pyspark = spark.read.csv(train_data_nm_reg, inferSchema=True, header=True)\n",
    "test_data_pyspark = spark.read.csv(test_data_nm_reg, inferSchema=True, header=True)\n",
    "\n",
    "# Loading the dataset for Frovedis Regression\n",
    "train_df = pd.read_csv(train_data_nm_reg)\n",
    "test_df = pd.read_csv(test_data_nm_reg)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Shape of the data:  (20052, 680)\n",
      "Shape of the training data:  (16041, 679)\n",
      "Shape of the testing data:  (4011, 679)\n",
      "Training data and testing data created successfully. \n",
      "\n"
     ]
    }
   ],
   "source": [
    "#---- Data Preparation for Classification----\n",
    "\n",
    "preprocess_data_classification('datasets/epi_r.csv')\n",
    "# Loading the dataset for Pyspark\n",
    "train_data_nm_cl = 'epr_train_cl.csv'\n",
    "test_data_nm_cl = 'epr_test_cl.csv'\n",
    "train_data_pyspark_cl = spark.read.csv(train_data_nm_cl, inferSchema=True, header=True)\n",
    "test_data_pyspark_cl = spark.read.csv(test_data_nm_cl, inferSchema=True, header=True)\n",
    "\n",
    "# Loading the dataset for Frovedis Regression\n",
    "train_df_cl = pd.read_csv(train_data_nm_cl)\n",
    "test_df_cl = pd.read_csv(test_data_nm_cl)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.1 FactorizationMachineRegressor"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "target = \"factorization_machines_regressor\"\n",
    "import frovedis\n",
    "from frovedis.exrpc.server import FrovedisServer\n",
    "FrovedisServer.initialize(\"mpirun -np 8 \" +  os.environ[\"FROVEDIS_SERVER\"])\n",
    "from frovedis.mllib.fm import FactorizationMachineRegressor as fFMR\n",
    "f_est = fFMR()\n",
    "e_nm = target + \"_frovedis_\" + frovedis.__version__\n",
    "evaluate_regression(f_est, e_nm, train_df, test_df)\n",
    "f_est.release()\n",
    "FrovedisServer.shut_down()\n",
    "\n",
    "import pyspark\n",
    "from pyspark.ml.regression import FMRegressor as pFMR\n",
    "p_est = pFMR(featuresCol=\"Features\", stepSize=0.01)\n",
    "e_nm = e_nm = target + \"_pyspark_\" + pyspark.__version__\n",
    "evaluate_regression(p_est, e_nm, train_data_pyspark, test_data_pyspark)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### 3.2 FactorizationMachineClassifier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "target = \"factorization_machines_classifier\"\n",
    "import frovedis\n",
    "from frovedis.exrpc.server import FrovedisServer\n",
    "FrovedisServer.initialize(\"mpirun -np 8 \" +  os.environ[\"FROVEDIS_SERVER\"])\n",
    "from frovedis.mllib.fm import FactorizationMachineClassifier as fFMC\n",
    "f_est = fFMC()\n",
    "e_nm = target + \"_frovedis_\" + frovedis.__version__\n",
    "evaluate_classifier(f_est, e_nm, train_df_cl, test_df_cl)\n",
    "f_est.release()\n",
    "FrovedisServer.shut_down()\n",
    "\n",
    "import pyspark\n",
    "from pyspark.ml.classification import FMClassifier as pFMC\n",
    "p_est = pFMC(labelCol=\"new_label\", featuresCol=\"Features\")\n",
    "e_nm = e_nm = target + \"_pyspark_\" + pyspark.__version__\n",
    "evaluate_classifier(p_est, e_nm, train_data_pyspark_cl, test_data_pyspark_cl)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "                                           estimator  train time  test time  \\\n",
      "0   factorization_machines_regressor_frovedis_0.9.10      1.0167     0.0580   \n",
      "1     factorization_machines_regressor_pyspark_3.0.2     10.4213     1.0614   \n",
      "2  factorization_machines_classifier_frovedis_0.9.10      1.0260     0.0826   \n",
      "3    factorization_machines_classifier_pyspark_3.0.2      8.0632     0.9113   \n",
      "\n",
      "   train-score  test-score  \n",
      "0     0.155742    0.089252  \n",
      "1     0.039614   -0.233533  \n",
      "2     0.465806    0.459237  \n",
      "3     0.560252    0.541511  \n"
     ]
    }
   ],
   "source": [
    "summary = pd.DataFrame(OrderedDict({ \"estimator\": estimator_name,\n",
    "                                     \"train time\": train_time,\n",
    "                                     \"test time\": test_time,\n",
    "                                     \"train-score\": train_score,\n",
    "                                     \"test-score\": test_score\n",
    "                                  }))\n",
    "print(summary)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
